/**
 * FileName: DataFilterAndOperatorUtil
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/1/18 10:23
 * Description:主要包含对原始数据的过滤，和操作两部分。
 * 通过加载外部配置文件rule.json的方式读取已配置的的规则对数据进行过滤；
 * 通过加载外部配置文件action.json的方式读取已配置的行为对符合规则的数据进行操作。
 *
 *  *注意：列的序号从0列开始，即第一列应该表示为 0，在后面提到的表达式中为 C0
 *
 * (1) 每个列支持的操：LK（正则匹配） EQ（等于） NE（不等于） GT（大于） LT（小于） GE（大于等于） LE（小于等于）
 * 一个行中的列规则之间支持逻辑表达式，使用CXX表示列，第二列为C1
 * 例如: C0 && (C1 ||C2)
 *       C0 || C1 || C2
 *
 * 为了可以筛选不符合规定长度的数据（某行数据分割后会存在少列或者多列的情况）可以采用'NE'进行含蓄表达处理
 * 例如：标准的分割后的数据有5列，为了去除不规范数据的规则（从零开始计数 0列即表示第1列）
 *      "sceneID": "sce01",
 *      "rules": [
 *      {
 *      "columnIndex": "4",
 *      "operator": "NE",
 *      "value": "Impossible Equivalent Strings"
 *      },
 *      {
 *      "columnIndex": "5",
 *      "operator": "NE",
 *      "value": "Impossible Equivalent Strings"
 *      }
 *      ],
 *      "logicalExpr":"!C4 || C5",
 *      注意："value"中的字符串可以随意定义，只是便于理解建议写为 Impossible Equivalent Strings
 *
 *      ***后续变动：将配置文件中的列改为从1开始计数     状态：未变动
 *
 *
 * (2) 每个列匹配到规则后会根据相对应的规则文件进行处理，也就是说rule.json和action.json的SceneID需要对应
 * 目前对于匹配之后的操作支持：
 *  REP（替换） 将指定的列替换为新的字符串
 *  ADD（增加） 在指定列数据的基础上追加内容，默认在列后追加字符串，若需要在特定位置增加则使用'?'表示原始数据：
 *            例如：原始数据为123 在开头增加"SA" : SA?
 *  DLC（删除列） 将指定的列数据置空，但是不会删除此列
 *  DPR（消除行） 将符合规则的行数据抹除
 */
package cn.com.bonc.util;
import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.constant.OperatorEnum;
import cn.com.bonc.domain.ColumnCell;
import cn.com.bonc.domain.ColumnRule;
import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;

import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Pattern;
import javax.inject.Singleton;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;

public class DataFilterAndOperatorUtil implements Serializable{

    private static final long serialVersionUID = -5058280680271232168L;

    /**读取出的规则数据*/
    private List<ColumnRule> rulesList;

    private DataFilterAndOperatorUtil(){
        try {
            this.rulesList=ExternalResourceUtil.loadRuleActionsJsonFileData();
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("===============================>rule-action.json配置文件加载出错!");
        }
    }

    private static class SingletonInstance{
        private static final DataFilterAndOperatorUtil instance = new DataFilterAndOperatorUtil();
    }
    public static DataFilterAndOperatorUtil getInstance(){
        return SingletonInstance.instance;
    }

    /**
     *
     * @param rule 列规则
     * @param columns 数组形式的原始数据
     * @return Boolean 返回是否匹配
     * @throws ScriptException
     */
    private Boolean matchRule(ColumnRule rule, String[] columns) throws ScriptException {

        int columnIndex = 0;//列索引
        int resultCode=2;//列数据转换成double之后的比较结果
        HashMap<Integer, Boolean>  resultMap = new HashMap<>();//每个列的匹配结果
        Boolean result = null;//最终返回结果

        for (ColumnCell columnCell : rule.getRules()) {

            String operator = columnCell.getOperator().toUpperCase();
            OperatorEnum operatorEnum =OperatorEnum.fromOperatorName(operator);
            columnIndex = columnCell.getColumnIndex();


            if (columnIndex>columns.length-1){
                //配置的列超过数据分割后数组的长度标记为false 不存在
                resultMap.put(columnIndex,false);
                operatorEnum=OperatorEnum.DEFAULT;
            }
            if (Pattern.compile("^(GT|GE|LT|LE)").matcher(operator).matches()){
                try {
                    resultCode =Double.valueOf(columns[columnIndex]).compareTo(Double.valueOf(columnCell.getValue()));
                }catch (NumberFormatException n){
                    System.out.println("===============================>无法将第"+columnIndex+"列转换为Double进行数值比较");
                    return false;
                }
            }
            switch (operatorEnum) {
                case LIKE:
                    boolean matches = Pattern.compile(columnCell.getValue())
                            .matcher(columns[columnIndex])
                            .matches();
                    resultMap.put(columnIndex,matches);
                    break;
                case EQUAL:
                    resultMap.put(columnIndex, columnCell.getValue().equals(columns[columnIndex]));
                    break;
                case NOT_EQUAL:
                    resultMap.put(columnIndex,!columnCell.getValue().equals(columns[columnIndex]));
                    break;
                case GREATER_THAN:
                    resultMap.put(columnIndex,resultCode==1?true:false);
                    break;
                case LESS_THAN:
                    resultMap.put(columnIndex,resultCode==-1?true:false);
                    break;
                case GREATER_THAN_OR_EQUAL:
                    resultMap.put(columnIndex,(resultCode==1||resultCode==0)?true:false);
                    break;
                case LESS_THAN_OR_EQUAL:
                    resultMap.put(columnIndex,(resultCode==-1||resultCode==0)?true:false);
                    break;
            }
        }

        //默认逻辑表达式且关系结果返回
        String logicalExpr =rule.getLogicalExpr();
        if (logicalExpr==null||logicalExpr.trim().equals("")){
            for (boolean b:resultMap.values()) {
                if (b==false){
                    return false;
                }
            }
            return true;
        }

        ScriptEngineManager manager = new ScriptEngineManager();
        ScriptEngine engine = manager.getEngineByName("js");
        resultMap.forEach((k,v)->engine.put("C"+k,v));
        try {
            result=(Boolean) engine.eval(rule.getLogicalExpr());
        }catch (ClassCastException c){
            System.out.println("===============================>表达式无法将最终计算结果转换为true或false");
            return false;
        }
        return result==null?false:result;
    }

    /**
     *
     * @param cellOperate 对列数据的系列操作
     * @param columns 数组形式的原始数据
     * @return
     */
    private String[] operateData(List<ColumnCell> cellOperate, String[] columns) {
        for (ColumnCell op:cellOperate) {
            int columnIndex = op.getColumnIndex();
            OperatorEnum operatorEnum =OperatorEnum.fromOperatorName(op.getOperator().toUpperCase());
            switch (operatorEnum){
                case REPLACE_STRING:
                    columns[columnIndex]=op.getValue();
                    break;
                case ADD_STRING:
                    String value = op.getValue();
                    String column = columns[columnIndex];
                    if (value.contains("?")){
                        columns[columnIndex] = value.replace("?", column);
                    }else {
                        columns[columnIndex] =column+value;
                    }
                    break;
                case DELETE_COLUMN:
                    columns[columnIndex]="";
                    break;
                case DROP_ROW:
                    columns=null;
                    break;
            }
        }
        return columns;
    }

    /**
     * 数据过滤方法，将数据按照外部配置文件进行过滤
     * @param dataset 泛型为String的dataset
     * @return
     */
    public Dataset<String> filter(Dataset<String> dataset){

        Dataset<String> resultDataset = dataset
                .map((MapFunction<String, String>) str -> filter(str),Encoders.STRING())
                .filter((FilterFunction<String>) str->!"null".equals(str));
        return resultDataset;
    }

    /**
     * 数据过滤具体实现，对字符串分割，筛选，处理。
     * 注意：当配置实现删除此字符串时，原字符串变为'null'字符，此时标志着此条记录被删除
     * @param line 行记录
     * @return
     * @throws ScriptException
     */
    public String filter(String line) throws ScriptException {
        if (!isDoRuleActionProcess()){
            return line;
        }
        String separator=ConfigurationManager.getProperty(Constants.DEFAULT_DATA_SEPARATOR);
        String[] columns=StringUtils.splitPreserveAllTokens(line,separator);
        for (ColumnRule columnRule:rulesList){
            if (matchRule(columnRule,columns)){
                //List<ColumnCell> cellOperate = actionMap.get(columnRule.getSceneID());
                List<ColumnCell> cellOperate = columnRule.getAct();
                if (cellOperate!=null){
                    columns=operateData(cellOperate,columns);
                    if (columns==null){
                        return "null";
                    }
                    //当匹配到这条规则后判断是否需要继续匹配下一条
                    if (!columnRule.isHasNextRule()){
                        break;
                    }
                }
            }
        }
        StringBuilder resultStringBuilder = new StringBuilder();
        Arrays.stream(columns).forEach(c-> resultStringBuilder.append(c+separator));
        return resultStringBuilder.deleteCharAt(resultStringBuilder.length()-1).toString();
    }

    private boolean isDoRuleActionProcess() {
        long count = rulesList.parallelStream().filter(p ->{
            if (p.getAct()!=null){
                return p.getAct().size()!=0;
            }
            return false;
        }).count();
        return count!=0;
    }
}